/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.phoenix.iterate;

import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MEMORY_CHUNK_BYTES;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MEMORY_WAIT_TIME;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_SPOOL_FILE_COUNTER;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_SPOOL_FILE_SIZE;

import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.sql.SQLException;
import java.util.List;
import org.apache.commons.io.output.DeferredFileOutputStream;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.phoenix.compile.ExplainPlanAttributes.ExplainPlanAttributesBuilder;
import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.memory.MemoryManager;
import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
import org.apache.phoenix.monitoring.MemoryMetricsHolder;
import org.apache.phoenix.monitoring.ReadMetricQueue;
import org.apache.phoenix.monitoring.SpoolingMetricsHolder;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.tuple.ResultTuple;
import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.ClientUtil;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.ResultUtil;
import org.apache.phoenix.util.TupleUtil;

/**
 * Result iterator that spools the results of a scan to disk once an in-memory threshold has been
 * reached. If the in-memory threshold is not reached, the results are held in memory with no disk
 * writing perfomed.
 * <p>
 * Spooling is deprecated and shouldn't be used while implementing new features. As of HBase
 * 0.98.17, we rely on pacing the server side scanners instead of pulling rows from the server and
 * potentially spooling to a temporary file created on clients.
 * </p>
 * @since 0.1
 */
@Deprecated
public class SpoolingResultIterator implements PeekingResultIterator {

  private final PeekingResultIterator spoolFrom;
  private final SpoolingMetricsHolder spoolMetrics;
  private final MemoryMetricsHolder memoryMetrics;

  /**
   * Spooling is deprecated and shouldn't be used while implementing new features. As of HBase
   * 0.98.17, we rely on pacing the server side scanners instead of pulling rows from the server and
   * potentially spooling to a temporary file created on clients.
   */
  @Deprecated
  public static class SpoolingResultIteratorFactory implements ParallelIteratorFactory {
    private final QueryServices services;

    public SpoolingResultIteratorFactory(QueryServices services) {
      this.services = services;
    }

    @Override
    public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner,
      Scan scan, String physicalTableName, QueryPlan plan) throws SQLException {
      ReadMetricQueue readRequestMetric = context.getReadMetricsQueue();
      SpoolingMetricsHolder spoolMetrics =
        new SpoolingMetricsHolder(readRequestMetric, physicalTableName);
      MemoryMetricsHolder memoryMetrics =
        new MemoryMetricsHolder(readRequestMetric, physicalTableName);
      return new SpoolingResultIterator(spoolMetrics, memoryMetrics, scanner, services);
    }
  }

  private SpoolingResultIterator(SpoolingMetricsHolder spoolMetrics,
    MemoryMetricsHolder memoryMetrics, ResultIterator scanner, QueryServices services)
    throws SQLException {
    this(spoolMetrics, memoryMetrics, scanner, services.getMemoryManager(),
      services.getProps().getLongBytes(QueryServices.CLIENT_SPOOL_THRESHOLD_BYTES_ATTRIB,
        QueryServicesOptions.DEFAULT_CLIENT_SPOOL_THRESHOLD_BYTES),
      services.getProps().getLongBytes(QueryServices.MAX_SPOOL_TO_DISK_BYTES_ATTRIB,
        QueryServicesOptions.DEFAULT_MAX_SPOOL_TO_DISK_BYTES),
      services.getProps().get(QueryServices.SPOOL_DIRECTORY,
        QueryServicesOptions.DEFAULT_SPOOL_DIRECTORY));
  }

  /**
   * Create a result iterator by iterating through the results of a scan, spooling them to disk once
   * a threshold has been reached. The scanner passed in is closed prior to returning.
   * @param scanner        the results of a table scan
   * @param mm             memory manager tracking memory usage across threads.
   * @param thresholdBytes the requested threshold. Will be dialed down if memory usage (as
   *                       determined by the memory manager) is exceeded.
   */
  SpoolingResultIterator(SpoolingMetricsHolder sMetrics, MemoryMetricsHolder mMetrics,
    ResultIterator scanner, MemoryManager mm, final long thresholdBytes, final long maxSpoolToDisk,
    final String spoolDirectory) throws SQLException {
    this.spoolMetrics = sMetrics;
    this.memoryMetrics = mMetrics;
    boolean success = false;
    long startTime = EnvironmentEdgeManager.currentTimeMillis();
    final MemoryChunk chunk = mm.allocate(0, thresholdBytes);
    long waitTime = EnvironmentEdgeManager.currentTimeMillis() - startTime;
    GLOBAL_MEMORY_WAIT_TIME.update(waitTime);
    memoryMetrics.getMemoryWaitTimeMetric().change(waitTime);
    DeferredFileOutputStream spoolTo = null;
    try {
      // Can't be bigger than int, since it's the max of the above allocation
      int size = (int) chunk.getSize();
      spoolTo =
        new DeferredFileOutputStream(size, "ResultSpooler", ".bin", new File(spoolDirectory)) {
          @Override
          protected void thresholdReached() throws IOException {
            try {
              super.thresholdReached();
            } finally {
              chunk.close();
            }
          }
        };
      DataOutputStream out = new DataOutputStream(spoolTo);
      final long maxBytesAllowed =
        maxSpoolToDisk == -1 ? Long.MAX_VALUE : thresholdBytes + maxSpoolToDisk;
      long bytesWritten = 0L;
      for (Tuple result = scanner.next(); result != null; result = scanner.next()) {
        int length = TupleUtil.write(result, out);
        bytesWritten += length;
        if (bytesWritten > maxBytesAllowed) {
          throw new SpoolTooBigToDiskException(
            "result too big, max allowed(bytes): " + maxBytesAllowed);
        }
      }
      if (spoolTo.isInMemory()) {
        byte[] data = spoolTo.getData();
        chunk.resize(data.length);
        spoolFrom = new InMemoryResultIterator(data, chunk);
        GLOBAL_MEMORY_CHUNK_BYTES.update(data.length);
        memoryMetrics.getMemoryChunkSizeMetric().change(data.length);
      } else {
        long sizeOfSpoolFile = spoolTo.getFile().length();
        GLOBAL_SPOOL_FILE_SIZE.update(sizeOfSpoolFile);
        GLOBAL_SPOOL_FILE_COUNTER.increment();
        spoolMetrics.getNumSpoolFileMetric().increment();
        spoolMetrics.getSpoolFileSizeMetric().change(sizeOfSpoolFile);
        spoolFrom = new OnDiskResultIterator(spoolTo.getFile());
        if (spoolTo.getFile() != null) {
          spoolTo.getFile().deleteOnExit();
        }
      }
      success = true;
    } catch (IOException e) {
      throw ClientUtil.parseServerException(e);
    } finally {
      try {
        scanner.close();
      } finally {
        try {
          if (spoolTo != null) {
            if (!success && spoolTo.getFile() != null) {
              spoolTo.getFile().delete();
            }
            spoolTo.close();
          }
        } catch (IOException ignored) {
          // ignore close error
        } finally {
          if (!success) {
            chunk.close();
          }
        }
      }
    }
  }

  @Override
  public Tuple peek() throws SQLException {
    return spoolFrom.peek();
  }

  @Override
  public Tuple next() throws SQLException {
    return spoolFrom.next();
  }

  @Override
  public void close() throws SQLException {
    spoolFrom.close();
  }

  /**
   * Backing result iterator if it was not necessary to spool results to disk.
   * @since 0.1
   */
  private static class InMemoryResultIterator implements PeekingResultIterator {
    private final MemoryChunk memoryChunk;
    private final byte[] bytes;
    private Tuple next;
    private int offset;

    private InMemoryResultIterator(byte[] bytes, MemoryChunk memoryChunk) throws SQLException {
      this.bytes = bytes;
      this.memoryChunk = memoryChunk;
      advance();
    }

    private Tuple advance() throws SQLException {
      if (offset >= bytes.length) {
        return next = null;
      }
      int resultSize = ByteUtil.vintFromBytes(bytes, offset);
      offset += WritableUtils.getVIntSize(resultSize);
      ImmutableBytesWritable value = new ImmutableBytesWritable(bytes, offset, resultSize);
      offset += resultSize;
      Tuple result = new ResultTuple(ResultUtil.toResult(value));
      return next = result;
    }

    @Override
    public Tuple peek() throws SQLException {
      return next;
    }

    @Override
    public Tuple next() throws SQLException {
      Tuple current = next;
      advance();
      return current;
    }

    @Override
    public void close() {
      memoryChunk.close();
    }

    @Override
    public void explain(List<String> planSteps) {
    }

    @Override
    public void explain(List<String> planSteps,
      ExplainPlanAttributesBuilder explainPlanAttributesBuilder) {
    }
  }

  /**
   * Backing result iterator if results were spooled to disk
   * @since 0.1
   */
  private static class OnDiskResultIterator implements PeekingResultIterator {
    private final File file;
    private DataInputStream spoolFrom;
    private Tuple next;
    private boolean isClosed;

    private OnDiskResultIterator(File file) {
      this.file = file;
    }

    private synchronized void init() throws IOException {
      if (spoolFrom == null) {
        spoolFrom =
          new DataInputStream(new BufferedInputStream(Files.newInputStream(file.toPath())));
        advance();
      }
    }

    private synchronized void reachedEnd() throws IOException {
      next = null;
      isClosed = true;
      try {
        if (spoolFrom != null) {
          spoolFrom.close();
        }
      } finally {
        file.delete();
      }
    }

    private synchronized Tuple advance() throws IOException {
      if (isClosed) {
        return next;
      }
      int length;
      try {
        length = WritableUtils.readVInt(spoolFrom);
      } catch (EOFException e) {
        reachedEnd();
        return next;
      }
      int totalBytesRead = 0;
      int offset = 0;
      byte[] buffer = new byte[length];
      while (totalBytesRead < length) {
        int bytesRead = spoolFrom.read(buffer, offset, length);
        if (bytesRead == -1) {
          reachedEnd();
          return next;
        }
        offset += bytesRead;
        totalBytesRead += bytesRead;
      }
      next = new ResultTuple(ResultUtil.toResult(new ImmutableBytesWritable(buffer, 0, length)));
      return next;
    }

    @Override
    public synchronized Tuple peek() throws SQLException {
      try {
        init();
        return next;
      } catch (IOException e) {
        throw ClientUtil.parseServerException(e);
      }
    }

    @Override
    public synchronized Tuple next() throws SQLException {
      try {
        init();
        Tuple current = next;
        advance();
        return current;
      } catch (IOException e) {
        throw ClientUtil.parseServerException(e);
      }
    }

    @Override
    public synchronized void close() throws SQLException {
      try {
        if (!isClosed) {
          reachedEnd();
        }
      } catch (IOException e) {
        throw ClientUtil.parseServerException(e);
      }
    }

    @Override
    public void explain(List<String> planSteps) {
    }

    @Override
    public void explain(List<String> planSteps,
      ExplainPlanAttributesBuilder explainPlanAttributesBuilder) {
    }
  }

  @Override
  public void explain(List<String> planSteps) {
  }

  @Override
  public void explain(List<String> planSteps,
    ExplainPlanAttributesBuilder explainPlanAttributesBuilder) {
  }
}
